Followable Apps Documentation
Table of Contents
- What is a Followable App?
- When to Use Followable Apps
- Where to Use Followable Apps
- How to Use Followable Apps
- Examples
- Best Practices
- Troubleshooting
What is a Followable App?
A Followable App is a specialized type of Corva backend application that can be followed by other applications, creating a chain reaction of app executions. When a followable app produces data, it automatically triggers the execution of any apps that are configured to follow it.
Key Characteristics:
- Data Producer: Followable apps must produce data to trigger following apps
- Chain Reaction: They enable cascading workflows where one app's output becomes another app's input
- App Type Restrictions: Only
stream
andscheduled
apps can be made followable - Real-time Processing: They facilitate real-time data processing pipelines
Core Concept:
Followable App → Produces Data → Triggers Following Apps → Chain Reaction
The followable app architecture enables building complex data processing workflows where apps can depend on the output of other apps, creating sophisticated real-time analytics pipelines.
When to Use Followable Apps
Primary Use Cases:
Multi-Stage Data Processing
- When you need to process data through multiple computational stages
- Each stage depends on the output of the previous stage
- Example: Raw sensor data → Cleaned data → Analytics → Alerts
Real-time Analytics Pipelines
- When building complex analytics workflows
- Multiple apps need to react to the same data events
- Real-time calculations that depend on processed results
Event-Driven Architectures
- When you want to trigger multiple processes from a single data event
- Decoupling data producers from data consumers
- Building reactive systems that respond to data changes
Data Enrichment Workflows
- When raw data needs multiple enrichment steps
- Each enrichment step can be handled by a specialized app
- Final enriched data is used by downstream applications
Scenarios Where Followable Apps Excel:
- Drilling Operations: Raw drilling data → Processed parameters → Alerts/Recommendations
- Well Performance: Production data → Calculations → Performance metrics → Optimization suggestions
- Equipment Monitoring: Sensor data → Health indicators → Predictive maintenance alerts
- Quality Control: Raw measurements → Statistical analysis → Quality scores → Reports
Where to Use Followable Apps
Platform Context:
Followable apps operate within the Corva DevCenter ecosystem and are built using the Corva Python SDK.
Infrastructure Requirements:
Corva DevCenter Account
- Backend app development environment
- Access to datasets and APIs
- App deployment and management tools
Data Sources
- Real-time data streams (for stream apps)
- Scheduled data intervals (for scheduled apps)
- Proper data ingestion setup
Datasets
- Write permissions to target datasets
- Proper dataset configuration for data storage
- Understanding of data schema requirements
Architecture Placement:
Data Sources → Stream/Scheduled Apps (Followable) → Following Apps → End Users/Systems
Integration Points:
- Upstream: Data ingestion systems, sensors, databases
- Downstream: Visualization tools, alert systems, reporting applications
- Lateral: Other Corva apps, external APIs, notification systems
How to Use Followable Apps
Step 1: Create a Followable App
Basic Structure:
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
# Process your data
processed_data = process_raw_data(event)
# Prepare data for storage and following apps
data = [
{
'asset_id': event.asset_id,
'version': 1,
'timestamp': processed_data['timestamp'],
'data': processed_data['results'],
}
]
# Store data and trigger following apps (recommended method)
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=data,
produce=True # This flag enables following apps to be triggered
)
return "Data processed and published successfully"
Step 2: Configure the App as Followable
In the Corva DevCenter:
- Open your app page
- Navigate to Settings tab
- Activate "Followable App" toggle
- Select a dataset (must have write permissions)
- Save configuration
Step 3: Create Following Apps
Scheduled Following App:
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def following_scheduled_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
# Query data from the followable app's dataset
data = api.get_dataset(
provider='your-provider',
dataset='followable-app-dataset',
query={
'asset_id': event.asset_id,
'company_id': event.company_id
},
sort={'timestamp': 1},
limit=100,
)
# Process the data
results = analyze_followable_data(data)
# Store results or trigger additional actions
return results
Stream Following App:
from corva import Api, Cache, StreamTimeEvent, stream
@stream
def following_stream_app(event: StreamTimeEvent, api: Api, cache: Cache):
# Data is automatically provided in the event
for record in event.records:
# Process each data record from the followable app
process_followable_record(record.data)
return "Stream processing completed"
Step 4: Configure Following Relationships
In the Corva DevCenter when creating a following app:
- Choose same Segment and Log Type as the followable app
- Select the followable app from "Select the data that your app will follow"
- Complete app creation
Data Production Methods:
Method 1: Separate API Calls (Slower)
# First, insert data
api.insert_data(
provider='my-provider',
dataset='my-dataset',
data=data,
)
# Then, produce messages
api.produce_messages(data=data)
Method 2: Combined API Call (Recommended)
# Insert data and produce messages in one call
api.insert_data(
provider='my-provider',
dataset='my-dataset',
data=data,
produce=True # Enable message production
)
Examples
Example 1: Drilling Data Processing Pipeline
Followable App - Raw Data Processor:
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def drilling_data_processor(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
"""
Processes raw drilling data and calculates basic parameters
"""
# Get raw drilling data
raw_data = get_raw_drilling_data(event.asset_id, event.start_time, event.end_time)
processed_records = []
for record in raw_data:
# Calculate drilling parameters
processed_record = {
'asset_id': event.asset_id,
'version': 1,
'timestamp': record['timestamp'],
'data': {
'depth': record['depth'],
'rop': calculate_rop(record),
'wob': record['weight_on_bit'],
'rpm': record['rotary_speed'],
'torque': record['torque'],
'flow_rate': record['mud_flow_rate'],
'processed_at': int(time.time())
}
}
processed_records.append(processed_record)
# Store processed data and trigger following apps
api.insert_data(
provider='drilling-analytics',
dataset='processed-drilling-data',
data=processed_records,
produce=True
)
return f"Processed {len(processed_records)} drilling records"
Following App - Performance Analyzer:
from corva import Api, Cache, ScheduledDataTimeEvent, scheduled
@scheduled
def drilling_performance_analyzer(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
"""
Analyzes processed drilling data for performance metrics
"""
# Get processed data from the followable app
processed_data = api.get_dataset(
provider='drilling-analytics',
dataset='processed-drilling-data',
query={
'asset_id': event.asset_id,
'company_id': event.company_id,
'timestamp': {'$gte': event.start_time, '$lte': event.end_time}
},
sort={'timestamp': 1}
)
if not processed_data:
return "No processed data available"
# Calculate performance metrics
performance_metrics = []
for record in processed_data:
data = record['data']
# Calculate drilling efficiency
efficiency_score = calculate_drilling_efficiency(
data['rop'], data['wob'], data['rpm'], data['torque']
)
# Detect anomalies
anomalies = detect_drilling_anomalies(data)
performance_record = {
'asset_id': event.asset_id,
'version': 1,
'timestamp': record['timestamp'],
'data': {
'efficiency_score': efficiency_score,
'anomalies': anomalies,
'recommendations': generate_recommendations(data, efficiency_score),
'analyzed_at': int(time.time())
}
}
performance_metrics.append(performance_record)
# Store performance analysis
api.insert_data(
provider='drilling-analytics',
dataset='drilling-performance',
data=performance_metrics,
produce=True # This could trigger alert systems
)
return f"Analyzed performance for {len(performance_metrics)} records"
Example 2: Real-time Well Monitoring
Followable App - Sensor Data Aggregator:
from corva import Api, Cache, StreamTimeEvent, stream
@stream
def sensor_data_aggregator(event: StreamTimeEvent, api: Api, cache: Cache):
"""
Aggregates and validates real-time sensor data
"""
aggregated_data = []
for record in event.records:
# Validate and clean sensor data
if validate_sensor_data(record.data):
aggregated_record = {
'asset_id': event.asset_id,
'version': 1,
'timestamp': record.timestamp,
'data': {
'pressure': record.data.get('pressure'),
'temperature': record.data.get('temperature'),
'flow_rate': record.data.get('flow_rate'),
'vibration': record.data.get('vibration'),
'quality_score': calculate_data_quality(record.data),
'aggregated_at': int(time.time())
}
}
aggregated_data.append(aggregated_record)
if aggregated_data:
# Store aggregated data and trigger following apps
api.insert_data(
provider='well-monitoring',
dataset='aggregated-sensor-data',
data=aggregated_data,
produce=True
)
return f"Aggregated {len(aggregated_data)} sensor records"
Following App - Alert System:
from corva import Api, Cache, StreamTimeEvent, stream
@stream
def well_alert_system(event: StreamTimeEvent, api: Api, cache: Cache):
"""
Monitors aggregated data for alert conditions
"""
alerts_generated = []
for record in event.records:
data = record.data
# Check for alert conditions
alerts = []
if data['pressure'] > PRESSURE_THRESHOLD:
alerts.append({
'type': 'HIGH_PRESSURE',
'severity': 'CRITICAL',
'message': f"Pressure {data['pressure']} exceeds threshold {PRESSURE_THRESHOLD}"
})
if data['temperature'] > TEMPERATURE_THRESHOLD:
alerts.append({
'type': 'HIGH_TEMPERATURE',
'severity': 'WARNING',
'message': f"Temperature {data['temperature']} exceeds threshold {TEMPERATURE_THRESHOLD}"
})
if data['quality_score'] < QUALITY_THRESHOLD:
alerts.append({
'type': 'POOR_DATA_QUALITY',
'severity': 'INFO',
'message': f"Data quality score {data['quality_score']} below threshold"
})
if alerts:
alert_record = {
'asset_id': event.asset_id,
'version': 1,
'timestamp': record.timestamp,
'data': {
'alerts': alerts,
'sensor_data': data,
'generated_at': int(time.time())
}
}
alerts_generated.append(alert_record)
if alerts_generated:
# Store alerts
api.insert_data(
provider='well-monitoring',
dataset='well-alerts',
data=alerts_generated,
produce=True # Could trigger notification systems
)
# Send immediate notifications for critical alerts
for alert_record in alerts_generated:
for alert in alert_record['data']['alerts']:
if alert['severity'] == 'CRITICAL':
send_immediate_notification(alert, event.asset_id)
return f"Generated {len(alerts_generated)} alert records"
Best Practices
1. Data Design
- Consistent Schema: Ensure data structure consistency across followable and following apps
- Versioning: Use version fields to handle schema evolution
- Timestamps: Always include accurate timestamps for proper event ordering
- Asset Context: Include asset_id and company_id for proper data isolation
2. Performance Optimization
- Batch Processing: Process data in batches when possible
- Efficient Queries: Use proper indexing and query optimization
- Data Size Management: Avoid producing excessively large data payloads
- Caching Strategy: Implement caching for frequently accessed data
3. Error Handling
@scheduled
def robust_followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
try:
# Main processing logic
data = process_data(event)
# Validate data before publishing
if validate_data(data):
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=data,
produce=True
)
return "Success"
else:
raise ValueError("Data validation failed")
except Exception as e:
# Log error for debugging
api.log_error(f"Followable app error: {str(e)}")
# Optionally, publish error information
error_data = [{
'asset_id': event.asset_id,
'version': 1,
'timestamp': int(time.time()),
'data': {
'error': str(e),
'event_info': str(event)
}
}]
api.insert_data(
provider='your-provider',
dataset='error-log',
data=error_data
)
raise # Re-raise to ensure proper error handling
4. Monitoring and Logging
@scheduled
def monitored_followable_app(event: ScheduledDataTimeEvent, api: Api, cache: Cache):
start_time = time.time()
try:
# Log app start
api.log_info(f"Starting followable app for asset {event.asset_id}")
# Process data
data = process_data(event)
# Log processing metrics
processing_time = time.time() - start_time
api.log_info(f"Processed {len(data)} records in {processing_time:.2f} seconds")
# Publish data
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=data,
produce=True
)
# Log success
api.log_info(f"Successfully published data for {len(data)} records")
return f"Processed {len(data)} records"
except Exception as e:
# Log error with context
api.log_error(f"Followable app failed: {str(e)}", extra={
'asset_id': event.asset_id,
'event_type': type(event).__name__,
'processing_time': time.time() - start_time
})
raise
Troubleshooting
Common Issues and Solutions
1. Following Apps Not Triggering
Symptoms:
- Followable app runs successfully
- Data is stored in dataset
- Following apps don't execute
Solutions:
# Ensure produce=True is set
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=data,
produce=True # This is crucial!
)
# Or use separate produce_messages call
api.insert_data(provider='your-provider', dataset='your-dataset', data=data)
api.produce_messages(data=data)
Checklist:
-
produce=True
flag is set ininsert_data()
- Following apps have correct segment and log type
- Followable app configuration is properly saved
- Dataset has correct write permissions
2. Data Schema Mismatches
Symptoms:
- Following apps receive unexpected data structure
- Processing errors in following apps
Solutions:
# Standardize data structure
def standardize_data_format(raw_data, asset_id):
return {
'asset_id': asset_id,
'version': 1, # Always include version
'timestamp': int(raw_data.get('timestamp', time.time())),
'data': {
# Your actual data here
'value1': raw_data.get('value1'),
'value2': raw_data.get('value2'),
# Include metadata
'source': 'followable_app_name',
'processed_at': int(time.time())
}
}
3. Performance Issues
Symptoms:
- Slow app execution
- Following apps timing out
- Large data volumes causing issues
Solutions:
# Implement batching
def process_in_batches(data, batch_size=100):
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
yield batch
# Process and publish in batches
for batch in process_in_batches(large_dataset):
processed_batch = process_batch(batch)
api.insert_data(
provider='your-provider',
dataset='your-dataset',
data=processed_batch,
produce=True
)
4. Debugging Following Relationships
Debug Steps:
Verify App Configuration:
# In following app, log received data
api.log_info(f"Received {len(event.records)} records from followable app")
for record in event.records:
api.log_debug(f"Record data: {record.data}")Check Dataset Contents:
# Query the dataset directly
stored_data = api.get_dataset(
provider='your-provider',
dataset='your-dataset',
query={'asset_id': event.asset_id},
sort={'timestamp': -1},
limit=10
)
api.log_info(f"Found {len(stored_data)} records in dataset")Validate Event Flow:
# Add comprehensive logging
@stream
def debug_following_app(event: StreamTimeEvent, api: Api, cache: Cache):
api.log_info(f"Following app triggered with {len(event.records)} records")
api.log_info(f"Event asset_id: {event.asset_id}")
api.log_info(f"Event timestamp range: {event.start_time} to {event.end_time}")
for i, record in enumerate(event.records):
api.log_debug(f"Record {i}: {record.data}")
Error Messages and Solutions
Error Message | Cause | Solution |
---|---|---|
"No following apps configured" | App not marked as followable | Enable followable toggle in DevCenter |
"Dataset write permission denied" | Insufficient permissions | Check dataset permissions in settings |
"Invalid data format" | Malformed data structure | Validate data schema before publishing |
"Following app timeout" | Processing takes too long | Optimize following app logic, implement batching |
"Circular dependency detected" | App following creates loop | Review app following relationships |
Performance Monitoring
import time
from functools import wraps
def monitor_performance(func):
@wraps(func)
def wrapper(event, api, cache):
start_time = time.time()
try:
result = func(event, api, cache)
duration = time.time() - start_time
# Log performance metrics
api.log_info(f"{func.__name__} completed in {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start_time
api.log_error(f"{func.__name__} failed after {duration:.2f}s: {str(e)}")
raise
return wrapper
@monitor_performance
@scheduled
def monitored_followable_app(event, api, cache):
# Your app logic here
pass
Conclusion
Followable Apps are a powerful feature in the Corva platform that enable the creation of sophisticated, event-driven data processing pipelines. By understanding the concepts, best practices, and implementation patterns outlined in this documentation, you can build robust and scalable applications that leverage the full potential of the Corva ecosystem.
Key Takeaways:
- Chain Reactions: Followable apps create automated workflows where data flows seamlessly between applications
- Real-time Processing: Enable real-time analytics and immediate response to data events
- Modular Architecture: Break complex processing into specialized, maintainable components
- Scalable Design: Handle large volumes of data through efficient processing patterns
Next Steps:
- Review the Corva Python SDK documentation for additional details
- Explore the Corva DevCenter for platform-specific guidance
- Start with simple followable app examples before building complex workflows
- Implement comprehensive testing and monitoring for production deployments
For additional support and examples, visit the Corva DevCenter Documentation.